package cn.com.ry.framework.monitormeter.metric.mq.rocketmq;

import cn.com.ry.framework.monitormeter.instrument.linklog.LinkLogMeterRegistry;
import cn.com.ry.framework.monitormeter.metric.mq.MqConsumerBean;
import cn.com.ry.framework.monitormeter.util.Assert;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.ToDoubleFunction;

public class ApacheRocketMqMetrics implements MeterBinder {
    private static final Logger logger = LoggerFactory.getLogger(ApacheRocketMqMetrics.class);

    private final ConcurrentMap<String, DefaultMQAdminExt> cache = new ConcurrentHashMap<String, DefaultMQAdminExt>();
    private DefaultMQAdminExt defaultMQAdminExt;
    private String namesrvAddress;
    private String name;
    private List<MqConsumerBean> mqConsumerBeanList;

    public ApacheRocketMqMetrics(String name, String namesrvAddress, String accessKey, String secretKey, List<MqConsumerBean> mqConsumerBeanList) {
        Assert.notEmpty(name, "name不能为空");
        Assert.notEmpty(accessKey, "Build DefaultMQAdminExt error, accessKey is null");
        Assert.notEmpty(secretKey, "Build DefaultMQAdminExt error, secretKey is null");
        this.name = name;
        this.mqConsumerBeanList = mqConsumerBeanList;
        createDefaultMQAdminExt(namesrvAddress, true, accessKey, secretKey);
    }

    public ApacheRocketMqMetrics(String name, String namesrvAddress, List<MqConsumerBean> mqConsumerBeanList) {
        Assert.notEmpty(name, "name不能为空");
        this.name = name;
        this.mqConsumerBeanList = mqConsumerBeanList;
        createDefaultMQAdminExt(namesrvAddress, false, null, null);

    }

    public void createDefaultMQAdminExt(String namesrvAddress, boolean enableACL, String accessKey, String secretKey) {
        Assert.notEmpty(namesrvAddress, "Build DefaultMQAdminExt error, namesrv is null");
        defaultMQAdminExt = cache.get(namesrvAddress);
        if (defaultMQAdminExt == null) {
            this.namesrvAddress = namesrvAddress;
            if (enableACL) {
                defaultMQAdminExt = new DefaultMQAdminExt(new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)), 5000L);
            } else {
                defaultMQAdminExt = new DefaultMQAdminExt(5000L);
            }
            defaultMQAdminExt.setInstanceName("admin-" + System.currentTimeMillis());
            defaultMQAdminExt.setNamesrvAddr(namesrvAddress);
            try {
                defaultMQAdminExt.start();
            } catch (MQClientException ex) {
                logger.error(String.format("init default admin error, namesrv=%s", System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY)), ex);
            }
            this.cache.put(namesrvAddress, defaultMQAdminExt);
        }
    }

    @Override
    public void bindTo(MeterRegistry registry) {
        if (defaultMQAdminExt != null && mqConsumerBeanList != null && mqConsumerBeanList.size() > 0) {
            if (registry instanceof LinkLogMeterRegistry) {
                for (MqConsumerBean rocketMqConsumerBean : mqConsumerBeanList) {
                    String topic = rocketMqConsumerBean.getTopic();
                    String consumerGroup = rocketMqConsumerBean.getConsumerGroup();

                    //TPS
                    Gauge.builder("mq.rocketmq." + name + "." + topic + "." + consumerGroup + ".tps", defaultMQAdminExt, new ToDoubleFunction<DefaultMQAdminExt>() {
                        @Override
                        public double applyAsDouble(DefaultMQAdminExt defaultMQAdminExt) {
                            double value = 0;
                            try {
                                ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup, topic);
                                value = consumeStats.getConsumeTps();
                            } catch (RemotingException e) {
                                e.printStackTrace();
                            } catch (MQClientException e) {
                                e.printStackTrace();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            } catch (MQBrokerException e) {
                                e.printStackTrace();
                            }
                            return value;
                        }
                    })
                            .tags(Tags.of("topic", topic))
                            .description("TPS监控，单位 数量/秒。")
                            .register(registry);


                    Gauge.builder("mq.rocketmq." + name + "." + topic + "." + consumerGroup + ".totalDiff", defaultMQAdminExt, new ToDoubleFunction<DefaultMQAdminExt>() {
                        @Override
                        public double applyAsDouble(DefaultMQAdminExt defaultMQAdminExt) {
                            double value = 0;
                            try {
                                ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup, topic);
                                value = consumeStats.computeTotalDiff();
                            } catch (RemotingException e) {
                                e.printStackTrace();
                            } catch (MQClientException e) {
                                e.printStackTrace();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            } catch (MQBrokerException e) {
                                e.printStackTrace();
                            }
                            return value;
                        }
                    })
                            .tags(Tags.of("topic", topic))
                            .description("积压量")
                            .register(registry);
                }
            } else if (registry instanceof PrometheusMeterRegistry) {
                for (MqConsumerBean rocketMqConsumerBean : mqConsumerBeanList) {
                    String topic = rocketMqConsumerBean.getTopic();
                    String consumerGroup = rocketMqConsumerBean.getConsumerGroup();

                    //TPS
                    Gauge.builder("mq.rocketmq.tps", defaultMQAdminExt, new ToDoubleFunction<DefaultMQAdminExt>() {
                        @Override
                        public double applyAsDouble(DefaultMQAdminExt defaultMQAdminExt) {
                            double value = 0;
                            try {
                                ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup, topic);
                                value = consumeStats.getConsumeTps();
                            } catch (RemotingException e) {
                                e.printStackTrace();
                            } catch (MQClientException e) {
                                e.printStackTrace();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            } catch (MQBrokerException e) {
                                e.printStackTrace();
                            }
                            return value;
                        }
                    })
                            .tags(Tags.of(Tag.of("mq.type", "rocketMq"), Tag.of("mq.topic", topic), Tag.of("mq.consumerGroup", consumerGroup)))
                            .description("TPS监控，单位 数量/秒。")
                            .register(registry);


                    Gauge.builder("mq.rocketmq.totalDiff", defaultMQAdminExt, new ToDoubleFunction<DefaultMQAdminExt>() {
                        @Override
                        public double applyAsDouble(DefaultMQAdminExt defaultMQAdminExt) {
                            double value = 0;
                            try {
                                ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup, topic);
                                value = consumeStats.computeTotalDiff();
                            } catch (RemotingException e) {
                                e.printStackTrace();
                            } catch (MQClientException e) {
                                e.printStackTrace();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            } catch (MQBrokerException e) {
                                e.printStackTrace();
                            }
                            return value;
                        }
                    })
                            .tags(Tags.of(Tag.of("mq.type", "rocketMq"), Tag.of("mq.topic", topic), Tag.of("mq.consumerGroup", consumerGroup)))
                            .description("积压量")
                            .register(registry);
                }
            }

        }

    }

}
